[[message-listener-container]]
= Message Listener Containers

Two `MessageListenerContainer` implementations are provided:

* `KafkaMessageListenerContainer`
* `ConcurrentMessageListenerContainer`

The `KafkaMessageListenerContainer` receives all message from all topics or partitions on a single thread.
The `ConcurrentMessageListenerContainer` delegates to one or more `KafkaMessageListenerContainer` instances to provide multi-threaded consumption.

Starting with version 2.2.7, you can add a `RecordInterceptor` to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
If the interceptor returns null, the listener is not called.
Starting with version 2.7, it has additional methods which are called after the listener exits (normally, or by throwing an exception).
Also, starting with version 2.7, there is now a `BatchInterceptor`, providing similar functionality for xref:kafka/receiving-messages/listener-annotation.adoc#batch-listeners[Batch Listeners].
In addition, the `ConsumerAwareRecordInterceptor` (and `BatchInterceptor`) provide access to the `Consumer<?, ?>`.
This might be used, for example, to access the consumer metrics in the interceptor.

IMPORTANT: You should not execute any methods that affect the consumer's positions and or committed offsets in these interceptors; the container needs to manage such information.

IMPORTANT: If the interceptor mutates the record (by creating a new one), the `topic`, `partition`, and `offset` must remain the same to avoid unexpected side effects such as record loss.

The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.

By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started.
You can set the listener container's `interceptBeforeTx` property to `false` to invoke the interceptor after the transaction has started instead.
Starting with version 2.9, this will apply to any transaction manager, not just `KafkaAwareTransactionManager`+++s+++.
This allows, for example, the interceptor to participate in a JDBC transaction started by the container.

Starting with versions 2.3.8, 2.4.6, the `ConcurrentMessageListenerContainer` now supports https://kafka.apache.org/documentation/#static_membership[Static Membership] when the concurrency is greater than one.
The `group.instance.id` is suffixed with `-n` with `n` starting at `1`.
This, together with an increased `session.timeout.ms`, can be used to reduce rebalance events, for example, when application instances are restarted.

[[kafka-container]]
== Using `KafkaMessageListenerContainer`

The following constructor is available:

[source, java]
----
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)
----

It receives a `ConsumerFactory` and information about topics and partitions, as well as other configuration, in a `ContainerProperties`
object.
`ContainerProperties` has the following constructors:

[source, java]
----
public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)
----

The first constructor takes an array of `TopicPartitionOffset` arguments to explicitly instruct the container about which partitions to use (using the consumer `assign()` method) and with an optional initial offset.
A positive value is an absolute offset by default.
A negative value is relative to the current last offset within a partition by default.
A constructor for `TopicPartitionOffset` that takes an additional `boolean` argument is provided.
If this is `true`, the initial offsets (positive or negative) are relative to the current position for this consumer.
The offsets are applied when the container is started.
The second takes an array of topics, and Kafka allocates the partitions based on the `group.id` property -- distributing partitions across the group.
The third uses a regex `Pattern` to select the topics.

To assign a `MessageListener` to a container, you can use the `ContainerProps.setMessageListener` method when creating the Container.
The following example shows how to do so:

[source, java]
----
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
----

Note that when creating a `DefaultKafkaConsumerFactory`, using the constructor that just takes in the properties as above means that key and value `Deserializer` classes are picked up from configuration.
Alternatively, `Deserializer` instances may be passed to the `DefaultKafkaConsumerFactory` constructor for key and/or value, in which case all Consumers share the same instances.
Another option is to provide `Supplier<Deserializer>` s (starting with version 2.3) that will be used to obtain separate `Deserializer` instances for each `Consumer`:

[source, java]
----

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
----

Refer to the https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.html[Javadoc] for `ContainerProperties` for more information about the various properties that you can set.

Since version 2.1.1, a new property called `logContainerConfig` is available.
When `true` and `INFO` logging is enabled each listener container writes a log message summarizing its configuration properties.

By default, logging of topic offset commits is performed at the `DEBUG` logging level.
Starting with version 2.1.2, a property in `ContainerProperties` called `commitLogLevel` lets you specify the log level for these messages.
For example, to change the log level to `INFO`, you can use `containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);`.

Starting with version 2.2, a new container property called `missingTopicsFatal` has been added (default: `false` since 2.3.4).
This prevents the container from starting if any of the configured topics are not present on the broker.
It does not apply if the container is configured to listen to a topic pattern (regex).
Previously, the container threads looped within the `consumer.poll()` method waiting for the topic to appear while logging many messages.
Aside from the logs, there was no indication that there was a problem.

As of version 2.8, a new container property `authExceptionRetryInterval` has been introduced.
This causes the container to retry fetching messages after getting any `AuthenticationException` or `AuthorizationException` from the `KafkaConsumer`.
This can happen when, for example, the configured user is denied access to read a certain topic or credentials are incorrect.
Defining `authExceptionRetryInterval` allows the container to recover when proper permissions are granted.

NOTE: By default, no interval is configured - authentication and authorization errors are considered fatal, which causes the container to stop.

Starting with version 2.8, when creating the consumer factory, if you provide deserializers as objects (in the constructor or via the setters), the factory will invoke the `configure()` method to configure them with the configuration properties.

[[using-ConcurrentMessageListenerContainer]]
== Using `ConcurrentMessageListenerContainer`

The single constructor is similar to the `KafkaListenerContainer` constructor.
The following listing shows the constructor's signature:

[source, java]
----
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)
----

It also has a `concurrency` property.
For example, `container.setConcurrency(3)` creates three `KafkaMessageListenerContainer` instances.

For the first constructor, Kafka distributes the partitions across the consumers using its group management capabilities.

[IMPORTANT]
====
When listening to multiple topics, the default partition distribution may not be what you expect.
For example, if you have three topics with five partitions each and you want to use `concurrency=15`, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle.
This is because the default Kafka `PartitionAssignor` is the `RangeAssignor` (see its Javadoc).
For this scenario, you may want to consider using the `RoundRobinAssignor` instead, which distributes the partitions across all of the consumers.
Then, each consumer is assigned one topic or partition.
To change the `PartitionAssignor`, you can set the `partition.assignment.strategy` consumer property (`ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG`) in the properties provided to the `DefaultKafkaConsumerFactory`.

When using Spring Boot, you can assign set the strategy as follows:

=====
[source]
----
spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
----
=====
====

When the container properties are configured with `TopicPartitionOffset`+++s+++, the `ConcurrentMessageListenerContainer` distributes the `TopicPartitionOffset` instances across the delegate `KafkaMessageListenerContainer` instances.

If, say, six `TopicPartitionOffset` instances are provided and the `concurrency` is `3`; each container gets two partitions.
For five `TopicPartitionOffset` instances, two containers get two partitions, and the third gets one.
If the `concurrency` is greater than the number of `TopicPartitions`, the `concurrency` is adjusted down such that each container gets one partition.

NOTE: The `client.id` property (if set) is appended with `-n` where `n` is the consumer instance that corresponds to the concurrency.
This is required to provide unique names for MBeans when JMX is enabled.

Starting with version 1.3, the `MessageListenerContainer` provides access to the metrics of the underlying `KafkaConsumer`.
In the case of `ConcurrentMessageListenerContainer`, the `metrics()` method returns the metrics for all the target `KafkaMessageListenerContainer` instances.
The metrics are grouped into the `Map<MetricName, ? extends Metric>` by the `client-id` provided for the underlying `KafkaConsumer`.

Starting with version 2.3, the `ContainerProperties` provides an `idleBetweenPolls` option to let the main loop in the listener container to sleep between `KafkaConsumer.poll()` calls.
An actual sleep interval is selected as the minimum from the provided option and difference between the `max.poll.interval.ms` consumer config and the current records batch processing time.

[[committing-offsets]]
== Committing Offsets

Several options are provided for committing offsets.
If the `enable.auto.commit` consumer property is `true`, Kafka auto-commits the offsets according to its configuration.
If it is `false`, the containers support several `AckMode` settings (described in the next list).
The default `AckMode` is `BATCH`.
Starting with version 2.3, the framework sets `enable.auto.commit` to `false` unless explicitly set in the configuration.
Previously, the Kafka default (`true`) was used if the property was not set.

The consumer `poll()` method returns one or more `ConsumerRecords`.
The `MessageListener` is called for each record.
The following lists describes the action taken by the container for each `AckMode` (when transactions are not being used):

* `RECORD`: Commit the offset when the listener returns after processing the record.
* `BATCH`: Commit the offset when all the records returned by the `poll()` have been processed.
* `TIME`: Commit the offset when all the records returned by the `poll()` have been processed, as long as the `ackTime` since the last commit has been exceeded.
* `COUNT`: Commit the offset when all the records returned by the `poll()` have been processed, as long as `ackCount` records have been received since the last commit.
* `COUNT_TIME`: Similar to `TIME` and `COUNT`, but the commit is performed if either condition is `true`.
* `MANUAL`: The message listener is responsible to `acknowledge()` the `Acknowledgment`.
After that, the same semantics as `BATCH` are applied.
* `MANUAL_IMMEDIATE`: Commit the offset immediately when the `Acknowledgment.acknowledge()` method is called by the listener.

When using xref:kafka/transactions.adoc[transactions], the offset(s) are sent to the transaction and the semantics are equivalent to `RECORD` or `BATCH`, depending on the listener type (record or batch).

NOTE: `MANUAL` and `MANUAL_IMMEDIATE` require the listener to be an `AcknowledgingMessageListener` or a `BatchAcknowledgingMessageListener`.
See xref:kafka/receiving-messages/message-listeners.adoc[Message Listeners].

Depending on the `syncCommits` container property, the `commitSync()` or `commitAsync()` method on the consumer is used.
`syncCommits` is `true` by default; also see `setSyncCommitTimeout`.
See `setCommitCallback` to get the results of asynchronous commits; the default callback is the `LoggingCommitCallback` which logs errors (and successes at debug level).

Because the listener container has its own mechanism for committing offsets, it prefers the Kafka `ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG` to be `false`.
Starting with version 2.3, it unconditionally sets it to false unless specifically set in the consumer factory or the container's consumer property overrides.

The `Acknowledgment` has the following method:

[source, java]
----
public interface Acknowledgment {

    void acknowledge();

}
----

This method gives the listener control over when offsets are committed.

Starting with version 2.3, the `Acknowledgment` interface has two additional methods `nack(long sleep)` and `nack(int index, long sleep)`.
The first one is used with a record listener, the second with a batch listener.
Calling the wrong method for your listener type will throw an `IllegalStateException`.

NOTE: If you want to commit a partial batch, using `nack()`, When using transactions, set the `AckMode` to `MANUAL`; invoking `nack()` will send the offsets of the successfully processed records to the transaction.

IMPORTANT: `nack()` can only be called on the consumer thread that invokes your listener.

IMPORTANT: `nack()` is not allowed when using xref:kafka/receiving-messages/ooo-commits.adoc[Out of Order Commits].

With a record listener, when `nack()` is called, any pending offsets are committed, the remaining records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next `poll()`.
The consumer can be paused before redelivery, by setting the `sleep` argument.
This is similar functionality to throwing an exception when the container is configured with a `DefaultErrorHandler`.

IMPORTANT: `nack()` pauses the entire listener for the specified sleep duration including all assigned partitions.

When using a batch listener, you can specify the index within the batch where the failure occurred.
When `nack()` is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next `poll()`.

See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers] for more information.

IMPORTANT: The consumer is paused during the sleep so that we continue to poll the broker to keep the consumer alive.
The actual sleep time, and its resolution, depends on the container's `pollTimeout` which defaults to 5 seconds.
The minimum sleep time is equal to the `pollTimeout` and all sleep times will be a multiple of it.
For small sleep times or, to increase its accuracy, consider reducing the container's `pollTimeout`.

Starting with version 3.0.10, batch listeners can commit the offsets of parts of the batch, using `acknowledge(index)` on the `Acknowledgment` argument.
When this method is called, the offset of the record at the index (as well as all previous records) will be committed.
Calling `acknowledge()` after a partial batch commit is performed will commit the offsets of the remainder of the batch.
The following limitations apply:

* `AckMode.MANUAL_IMMEDIATE` is required
* The method must be called on the listener thread
* The listener must consume a `List` rather than the raw `ConsumerRecords`
* The index must be in the range of the list's elements
* The index must be larger than that used in a previous call

These restrictions are enforced and the method will throw an `IllegalArgumentException` or `IllegalStateException`, depending on the violation.

[[container-auto-startup]]
== Listener Container Auto Startup

The listener containers implement `SmartLifecycle`, and `autoStartup` is `true` by default.
The containers are started in a late phase (`Integer.MAX-VALUE - 100`).
Other components that implement `SmartLifecycle`, to handle data from listeners, should be started in an earlier phase.
The `- 100` leaves room for later phases to enable components to be auto-started after the containers.

